package sputils

import (
	"fmt"
	"gitee.com/ymofen/gobase/subpub"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

/*
支持使用路由进行订阅 # 匹配下级所有， +匹配当前层
topic/news

10000 topic

pub:
sync.map
lk:
	utils_subscribe_v2._test.go:168: 25505, 5094/s
	utils_subscribe_v2._test.go:168: 50409, 5035/s

nolk:
	utils_subscribe_v2._test.go:168: 25767, 5135/s
	utils_subscribe_v2._test.go:168: 51914, 5175/s

map with lk
		utils_subscribe_v2._test.go:168: 34228, 6844/s
		utils_subscribe_v2._test.go:168: 68952, 6894/s

map with lk
	    utils_subscribe_v2._test.go:168: 35343, 6977/s
	    utils_subscribe_v2._test.go:168: 71194, 7072/s

sesslst -> map
    utils_subscribe_v2._test.go:168: 36668, 7321/s
    utils_subscribe_v2._test.go:168: 74022, 7383/s
*/

type ChannelSubPublisher interface {
	Pub(max int, args ...interface{}) int

	Sub(id string, cb subpub.SubFunc) int
	Unsub(id string) (r int)
}

type channelSubSession struct {
	// 引用计数(可能多个主题匹配到同一个通道)
	refN int32

	// 订阅者
	id string
	fn subpub.SubFunc
}

// 发布通道
type channelItem struct {
	refN          int32
	owner         *Subscribe
	id            string
	matchTopic    interface{}
	sesslst       map[string]*channelSubSession
	idlst         []string
	fnlst         []subpub.SubFunc
	lastActivityT int64
}

// 订阅主题
type subTopicItem struct {
	subtopicid string
	subtopic   interface{}
	sesslst    map[string]subpub.SubFunc
}

// 订阅中心,
// 订阅主题:支持通配符/news/+/qq, 将会收到匹配的的通道消息
// PUB消息通道不支持通配符
type Subscribe struct {
	lockcnt int32

	lk sync.RWMutex
	// 通道列表
	channellst map[string]*channelItem

	// 订阅主题列表
	subTopiclst    map[string]*subTopicItem
	routeParseFunc func(topic string) interface{}                  // 必须确保有值
	routeMatchFunc func(route interface{}, topic interface{}) bool // 必须确保有值
}

var (
	DefaultSubscribe = NewSubscribe()
)

func innerParseRoute(route string) interface{} {
	return strings.Split(route, "/")
}

func innerRouteIncludesTopic0(route []string, topic []string) bool {
	if len(route) == 0 {
		return len(topic) == 0
	}

	if len(topic) == 0 {
		return route[0] == "#"
	}

	if route[0] == "#" {
		return true
	}

	if (route[0] == "+") || (route[0] == topic[0]) {
		return innerRouteIncludesTopic0(route[1:], topic[1:])
	}
	return false
}

/*
1000W:consume:730(ms)
*/
func innerRouteIncludesTopic(route interface{}, topic interface{}) bool {
	return innerRouteIncludesTopic0(route.([]string), topic.([]string))
}

func NewSubscribe() *Subscribe {
	return &Subscribe{
		channellst:     make(map[string]*channelItem),
		subTopiclst:    make(map[string]*subTopicItem),
		routeParseFunc: innerParseRoute,
		routeMatchFunc: innerRouteIncludesTopic,
	}
}

func NewSubscribeEx(routeParseFunc func(topic string) interface{}, routeMatchFunc func(route interface{}, topic interface{}) bool) *Subscribe {
	if routeParseFunc == nil {
		panic("invalid routeParseFunc")
	}
	if routeMatchFunc == nil {
		panic("invalid routeMatch")
	}
	return &Subscribe{
		channellst:     make(map[string]*channelItem),
		subTopiclst:    make(map[string]*subTopicItem),
		routeParseFunc: routeParseFunc,
		routeMatchFunc: routeMatchFunc,
	}
}

// 创建并增加引用通道的引用计数,
//
//	CheckGetPubChannelAddRef/CheckClosePubChannelDecRef 必须配套调用否则会造成通道泄漏
func (this *Subscribe) CheckGetPubChannelAddRef(channel string) ChannelSubPublisher {
	this.lk.Lock()
	atomic.AddInt32(&this.lockcnt, 1)
	itm := this.innerCheckCreateChannel(channel)
	itm.refN++
	this.lk.Unlock()
	return itm
}

// 释放通道引用计数, 如果引用计数:0 则进行释放通道
func (this *Subscribe) CheckClosePubChannelDecRef(ch ChannelSubPublisher) (closed bool) {
	this.lk.Lock()
	defer this.lk.Unlock()
	atomic.AddInt32(&this.lockcnt, 1)
	if itm, ok := ch.(*channelItem); ok {
		itm.refN--
		if itm.refN == 0 {
			delete(this.channellst, itm.id)
			return true
		}
	}
	return false
}

// 直接释放通道
func (this *Subscribe) ClosePubChannel(channel string) (closed bool) {
	this.lk.Lock()
	defer this.lk.Unlock()
	atomic.AddInt32(&this.lockcnt, 1)
	itm := this.channellst[channel]
	return this.innerFreeChannel(itm)
}

func (this *Subscribe) Close() error {
	this.lk.Lock()
	defer this.lk.Unlock()
	for k, _ := range this.subTopiclst {
		delete(this.subTopiclst, k)
	}
	return nil
}

func (this *Subscribe) matchSubTopic(topics interface{}, fn func(itm *subTopicItem) bool) {
	this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
		if this.routeMatchFunc(itm.subtopic, topics) {
			return fn(itm)
		}
		return true
	})
}

func (this *Subscribe) checkGetTopic(topic string, new bool) *subTopicItem {
	itm := this.subTopiclst[topic]
	if new && itm == nil {
		itm = &subTopicItem{subtopicid: topic, subtopic: this.routeParseFunc(topic), sesslst: make(map[string]subpub.SubFunc)}
		this.subTopiclst[topic] = itm
	}
	return itm
}

func (this *Subscribe) Status() string {
	this.lk.RLock()
	defer this.lk.RUnlock()
	return fmt.Sprintf("subTopicN:%d, channel:%d, topicSessN:%d", len(this.subTopiclst), len(this.channellst), this.sessionCount())
}

func (this *Subscribe) GetTopicSubCount(topic string) int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	itm := this.checkGetTopic(topic, false)
	if itm == nil {
		return 0
	}
	return len(itm.sesslst)
}

func (this *Subscribe) TopicCount() int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	return len(this.subTopiclst)
}

func (this *Subscribe) rangeSubTopics(fn func(key string, itm *subTopicItem) bool) {
	for k, v := range this.subTopiclst {
		if !fn(k, v) {
			break
		}
	}
}

func (this *Subscribe) sessionCount() int {
	n := 0
	this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
		n += len(itm.sesslst)
		return true
	})
	return n
}

func (this *Subscribe) Count() int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	return this.sessionCount()
}

func (this *Subscribe) StatusDetail() string {
	var sb strings.Builder
	sb.WriteString(fmt.Sprintf("topic:%d", this.TopicCount()))
	lst := make([]string, 0, 1024)
	this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
		lst = append(lst, key)
		return true
	})
	sort.Strings(lst)
	for i := 0; i < len(lst); i++ {
		itm := this.checkGetTopic(lst[i], false)
		if itm != nil {
			sb.WriteString(fmt.Sprintf(",%s:%d", lst[i], len(itm.sesslst)))
		}
	}
	return sb.String()
}

// 添加订阅者(subid)到通道(channel)
// changed: true表示通道中订阅者有新增
func (this *Subscribe) addSub2Channel(channel *channelItem, subid string, cb subpub.SubFunc) (changed bool) {
	itm := channel.sesslst[subid]
	if itm == nil {
		itm = &channelSubSession{id: subid, fn: cb}
		channel.sesslst[subid] = itm
		changed = true
	}
	itm.refN++
	return
}

// 从通道中减少一个订阅者(subid)引用, 如果引用为:0, 则移除订阅者, 返回:true
func (this *Subscribe) releaseTopic(channel *channelItem, subid string) (changed bool) {
	itm := channel.sesslst[subid]
	if itm == nil {
		return false
	}
	itm.refN--
	if itm.refN == 0 {
		delete(channel.sesslst, subid)
		return true
	}
	return false
}

// lk must be held
// 添加订阅者(subid)到所有匹配的通道, (可能多个主题匹配到同一个通道)
func (this *Subscribe) innerAdd2Channel(id string, topic interface{}, cb subpub.SubFunc) {
	for _, v := range this.channellst {
		if this.routeMatchFunc(topic, v.matchTopic) {
			if this.addSub2Channel(v, id, cb) { // 同一个通道下, 同一个订阅者(id), 确保只订阅一次
				this.innerReloadSubSessionFnlst(v)
			}
			v.refN++
		}
	}
}

// 从匹配的通道中, 移除一次订阅者(subid)
func (this *Subscribe) innerRemoveFromChannel(id string, topic interface{}) (cnt int) {
	for _, v := range this.channellst {
		if this.routeMatchFunc(topic, v.matchTopic) {
			if this.releaseTopic(v, id) {
				cnt++
				this.innerReloadSubSessionFnlst(v)
			}
			v.refN--
		}
	}
	return
}

func (this *Subscribe) innerCheckCreateChannel(channel string) *channelItem {
	itm := this.channellst[channel]
	if itm != nil {
		return itm
	}
	itm = &channelItem{owner: this, matchTopic: this.routeParseFunc(channel), id: channel, sesslst: make(map[string]*channelSubSession)}
	this.channellst[channel] = itm
	this.innerCollectSubSession(itm)
	return itm
}

func (this *Subscribe) innerReloadSubSessionFnlst(itm *channelItem) {
	fnlst := make([]subpub.SubFunc, len(itm.sesslst))
	idlst := make([]string, len(itm.sesslst))

	i := 0
	for _, s := range itm.sesslst { // 所有session都添加进去
		if s.fn == nil {
			panic("callback is nil")
		}
		fnlst[i] = s.fn
		idlst[i] = s.id
		i++
	}

	itm.fnlst, itm.idlst = fnlst, idlst
}

func (this *Subscribe) innerCollectSubSession(itm *channelItem) {
	for _, v := range this.subTopiclst {
		if this.routeMatchFunc(v.subtopic, itm.matchTopic) {
			for sid, sfn := range v.sesslst { // 所有session都添加进去
				this.addSub2Channel(itm, sid, sfn)
			}
			itm.refN++
		}
	}
	this.innerReloadSubSessionFnlst(itm)
}

// 订阅一个主题
//
// topic订阅主题,为空不进行订阅
// id订阅者id,topic 下id重复将会被覆盖(之前订阅失效)
func (this *Subscribe) Sub(id, topic string, cb subpub.SubFunc) int {
	if len(topic) == 0 {
		return 0
	}
	this.lk.Lock()
	defer this.lk.Unlock()
	atomic.AddInt32(&this.lockcnt, 1)
	itm := this.checkGetTopic(topic, true)
	itm.sesslst[id] = cb
	this.innerAdd2Channel(id, itm.subtopic, cb)
	return len(itm.sesslst)
}

// 取消订阅
// id 订阅时传入的id
// topic订阅的主题
// r: 返回取消订阅后, topic下订阅者数量
// changed: true表示topic主题下订阅者数量发生变化
func (this *Subscribe) Unsub(id, topic string) (r int, changed bool) {
	if len(topic) == 0 {
		return -1, false
	}
	this.lk.Lock()
	defer this.lk.Unlock()
	atomic.AddInt32(&this.lockcnt, 1)
	itm := this.checkGetTopic(topic, false)
	if itm != nil {
		r0 := len(itm.sesslst)
		delete(itm.sesslst, id)
		r = len(itm.sesslst)
		if r == 0 {
			delete(this.subTopiclst, topic)
		}
		this.innerRemoveFromChannel(id, itm.subtopic)
		return r, r != r0
	}
	return -1, false
}

// >1 投递成功max次后停止
// 循环map
//
//	utils_subscribe_v2._test.go:168: 34228, 6844/s
//	utils_subscribe_v2._test.go:168: 68952, 6894/s
//
// 循环lst
//
//	utils_subscribe_v2._test.go:168: 35343, 6977/s
//	utils_subscribe_v2._test.go:168: 71194, 7072/s
//
// 向主题订阅者推送数据
// topic推送的主题
// max最大接收者,超过该值不再进行推送
//
//max:0, 全部投递
//max:0, 全部投递
func (this *Subscribe) Pub(channel string, max int, args ...interface{}) int {
	n := 0
	var fnlst []subpub.SubFunc
	var idlst []string
	this.lk.RLock()
	itm := this.channellst[channel]
	if itm != nil {
		fnlst, idlst = itm.fnlst, itm.idlst
		itm.lastActivityT = time.Now().Unix()
	}
	this.lk.RUnlock()
	if itm == nil {
		this.lk.Lock()
		atomic.AddInt32(&this.lockcnt, 1)
		itm = this.innerCheckCreateChannel(channel)
		fnlst, idlst = itm.fnlst, itm.idlst
		itm.lastActivityT = time.Now().Unix()
		this.lk.Unlock()
	}

	if len(idlst) != len(fnlst) {
		return -1
	}

	for idx, fn := range fnlst {
		if fn(idlst[idx], channel, args...) {
			n++
			if max > 0 && n >= max {
				break
			}
		}
	}
	return n
}

// 通道必须确保不被释放,
//
//	sub/pub 必须配套使用, 否则通道的生命周期会混乱
func (this *channelItem) Sub(id string, cb subpub.SubFunc) int {
	this.owner.lk.Lock()
	defer this.owner.lk.Unlock()
	atomic.AddInt32(&this.owner.lockcnt, 1)
	if this.owner.addSub2Channel(this, id, cb) {
		this.owner.innerReloadSubSessionFnlst(this)
	}
	this.refN++
	return len(this.idlst)
}

// 通道必须确保不被释放
func (this *channelItem) Unsub(id string) int {
	this.owner.lk.Lock()
	defer this.owner.lk.Unlock()
	atomic.AddInt32(&this.owner.lockcnt, 1)
	if this.owner.releaseTopic(this, id) {
		this.owner.innerReloadSubSessionFnlst(this)
	}
	this.refN--
	return len(this.idlst)
}

// 通道必须确保不被释放
func (this *channelItem) Pub(max int, args ...interface{}) (n int) {
	fnlst, idlst := this.fnlst, this.idlst
	this.lastActivityT = time.Now().Unix()

	for idx, fn := range fnlst {
		if fn(idlst[idx], this.id, args...) {
			n++
			if max > 0 && n >= max {
				break
			}
		}
	}
	return n
}

func (this *Subscribe) innerFreeChannel(itm *channelItem) bool {
	if itm == nil || len(itm.sesslst) > 0 {
		return false
	}
	delete(this.channellst, itm.id)
	itm.matchTopic = nil
	itm.idlst = nil
	itm.sesslst = nil
	itm.fnlst = nil
	itm.idlst = nil
	return true
}

//// 释放通道
//func (this *Subscribe) ClosePubChannel(channel string) (closed bool) {
//	this.lk.Lock()
//	defer this.lk.Unlock()
//
//	atomic.AddInt32(&this.lockcnt, 1)
//	itm := this.channellst[channel]
//	return this.innerFreeChannel(itm)
//}

// 清理一些超时10分钟没有发布消息的通道
func (this *Subscribe) CleanChannels() (cnt int) {
	t := time.Now().Unix()
	var lst []*channelItem
	this.lk.RLock()
	for _, itm := range this.channellst {
		if itm.refN <= 0 && t-itm.lastActivityT > 600 { // 10分钟没有发布数据, 进行清理
			lst = append(lst, itm)
		}
	}
	lst = append(lst)
	this.lk.RUnlock()

	if len(lst) > 0 {
		this.lk.Lock()
		defer this.lk.Unlock()
		for i := 0; i < len(lst); i++ {
			if this.innerFreeChannel(lst[i]) {
				cnt++
			}
		}
	}
	return

}
